【CDK】EventBridgeとS3をマッチングして特定パス・ファイル名でトリガーを制御する方法
はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回はS3にファイルがPutされたことをトリガーに起動するEventBridgeに、特定のパターン制限をかけたいと思います。
前提
今回実現したい構成は以下になります。要件としては、inputフォルダ直下のtest-20240629.csvやtest-20240630.csvの場合のみ起動し、空ファイルやフォルダ、サブフォルダで起動してほしくないことがあります。
- EventBridgeでdata-source Bucketへの
Object Created
イベントを検知し、Lambdaを起動します。- prefix:
input/
- ファイル名:
test-*.csv
- prefix:
- LambdaでトリガーとなったS3Pathを標準出力する。
実装
それではCDK実装になります。実装コードはリンクに格納しています。
@39_eventbridge_no_empty_and_folder_with_cdk % tree . ├── README.md ├── cdk │ ├── bin │ │ └── app.ts │ ├── cdk.json │ ├── jest.config.js │ ├── lib │ │ ├── constructs │ │ │ ├── eventbridge.ts │ │ │ ├── lambda.ts │ │ │ └── s3.ts │ │ └── stack │ │ └── event-trigger-test-stack.ts │ ├── package-lock.json │ ├── package.json │ ├── parameter.ts │ ├── test │ │ └── app.test.ts │ └── tsconfig.json ├── package-lock.json └── resources └── handler.py 8 directories, 15 files @39_eventbridge_no_empty_and_folder_with_cdk %
bin/app.ts
#!/usr/bin/env node import * as cdk from "aws-cdk-lib"; import { ETLStack } from "../lib/stack/event-trigger-test-stack"; import { devParameter, prodParameter } from "../parameter"; const app = new cdk.App(); // This context need to be specified in args const argContext = "environment"; let envKey = app.node.tryGetContext(argContext); if (envKey == undefined) { console.warn( `Warning: Environment key not specified, defaulting to 'dev'. For specifying environment, use context option. ex) cdk deploy -c ${argContext}=dev/prod` ); // throw new Error( // `Please specify environment with context option. ex) cdk deploy -c ${argContext}=dev` // ); envKey = "dev"; } let parameter; if (envKey === "dev") { parameter = devParameter; } else { parameter = prodParameter; } new ETLStack(app, `CMKasamaETL${envKey.toUpperCase()}`, { description: `${parameter.projectName}-${parameter.envName}-test-tag`, env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION, }, tags: { Repository: `${parameter.projectName}-${parameter.envName}-test-tag`, Environment: parameter.envName, }, projectName: parameter.projectName, envName: parameter.envName, });
envKey
:cdk deploy
コマンド実行時に引数としてenvironmentを受け取ります。environmentの値に応じてパラメータを設定することで環境に応じた設定を実現するようにしています。envKeyコマンドライン引数がなければデフォルトでdevを設定するようにしていますが、プロジェクトに応じては、引数がなければエラーで返す実装でも良いと思います。description
: CloudFormation StackのDescriptionとなります。env
: deploy先のaccoun, regionを設定します。parameterで定義していなければdeployコマンド実行環境のデフォルト値を設定します。tag
: 作成されるリソースに対してのタグを設定します。projectName, envName
: 処理の中でリソース名の一部として使用します。
lib/constructs/eventbridge.ts
import { Construct } from "constructs"; import * as events from "aws-cdk-lib/aws-events"; import * as eventsTargets from "aws-cdk-lib/aws-events-targets"; import * as lambda from "aws-cdk-lib/aws-lambda"; import * as iam from "aws-cdk-lib/aws-iam"; export interface EventBridgeConstructProps { envName: string; projectName: string; dataSourceBucketName: string; lambdaFunctionArn: string; } export class EventBridgeConstruct extends Construct { constructor(scope: Construct, id: string, props: EventBridgeConstructProps) { super(scope, id); // Lambda 関数の参照を先に行う const lambdaFunction = lambda.Function.fromFunctionAttributes( this, "ImportedLambdaFunction", { functionArn: props.lambdaFunctionArn, sameEnvironment: true, } ); // EventBridge が Lambda を呼び出すための IAM ロールを作成 const eventBridgeRole = new iam.Role(this, "EventBridgeInvokeLambdaRole", { assumedBy: new iam.ServicePrincipal("events.amazonaws.com"), roleName: `${props.projectName}-${props.envName}-eventbridge-invoke-lambda-role`, description: "IAM role for EventBridge to invoke Lambda function", }); // Lambda 関数を呼び出す権限をロールに付与 lambdaFunction.grantInvoke(eventBridgeRole); const s3PutRule = new events.Rule(this, "S3PutRule", { description: `Triggers Lambda function when a file is uploaded to the ${props.dataSourceBucketName} bucket`, eventPattern: { source: ["aws.s3"], detailType: ["Object Created"], detail: { bucket: { name: [props.dataSourceBucketName], }, object: { key: [ { prefix: "input/test-", }, { suffix: ".csv", }, ], // key: [ // { // wildcard: "input/test-*.csv", // }, // ], size: [{ numeric: [">", 0] }], }, }, }, ruleName: `${props.projectName}-${props.envName}-s3-put-rule`, }); s3PutRule.addTarget(new eventsTargets.LambdaFunction(lambdaFunction, {})); } }
EventBridge, EventBridge用のIAM Role, IAM RoleのPolicyの定義をしています。今回の要件に沿ったイベントパターンとなるように実装しています。preifxとsuffixを用いたパターンとコメントアウトされているwildcardを用いたパターンは同様の動作をする想定のため、それぞれ後ほど確認します。
lib/constructs/lambda.ts
import * as cdk from "aws-cdk-lib"; import * as lambda from "aws-cdk-lib/aws-lambda"; import { Construct } from "constructs"; import * as iam from "aws-cdk-lib/aws-iam"; export interface LambdaConstructProps { envName: string; projectName: string; } export class LambdaConstruct extends Construct { public readonly lambdaArn: string; constructor(scope: Construct, id: string, props: LambdaConstructProps) { super(scope, id); const lambdaRole = new iam.Role(this, "LambdaExecutionRole", { assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"), managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName( "service-role/AWSLambdaBasicExecutionRole" ), ], }); const lambdaName = `${props.projectName}-${props.envName}-event-trigger-check-handler`; const lambdaFunction = new lambda.Function(this, "EtlHandler", { functionName: lambdaName, runtime: lambda.Runtime.PYTHON_3_12, code: lambda.Code.fromAsset("../resources/"), handler: "handler.main", memorySize: 512, timeout: cdk.Duration.seconds(900), role: lambdaRole, architecture: lambda.Architecture.ARM_64, }); this.lambdaArn = lambdaFunction.functionArn; } }
Lambda、Lambda用IAM Roleを定義しています。
lib/constructs/s3.ts
import * as cdk from "aws-cdk-lib"; import { Bucket, BlockPublicAccess, BucketEncryption, } from "aws-cdk-lib/aws-s3"; import { Construct } from "constructs"; export interface S3ConstructProps { envName: string; projectName: string; } export class S3Construct extends Construct { public readonly dataSourceBucket: Bucket; public readonly dataStoreBucket: Bucket; constructor(scope: Construct, id: string, props: S3ConstructProps) { super(scope, id); this.dataSourceBucket = new Bucket(this, "DataSourceBucket", { bucketName: `${props.projectName}-${props.envName}-data-source`, removalPolicy: cdk.RemovalPolicy.DESTROY, blockPublicAccess: BlockPublicAccess.BLOCK_ALL, encryption: BucketEncryption.KMS_MANAGED, versioned: true, eventBridgeEnabled: true, }); } }
トリガーとなるファイルを格納するBucketを定義しています。Bucketを他のConstructで参照するため、public readonlyで宣言します。
lib/stack/event-trigger-test-stack.ts
import { Construct } from "constructs"; import * as cdk from "aws-cdk-lib"; import { S3Construct } from "../constructs/s3"; import { LambdaConstruct, LambdaConstructProps } from "../constructs/lambda"; import { EventBridgeConstruct, EventBridgeConstructProps, } from "../constructs/eventbridge"; export interface ETLStackProps extends cdk.StackProps { envName: string; projectName: string; } export class ETLStack extends cdk.Stack { constructor(scope: Construct, id: string, props: ETLStackProps) { super(scope, id, props); const s3Construct = new S3Construct(this, "S3", { envName: props.envName, projectName: props.projectName, }); const lambdaConstruct = new LambdaConstruct(this, "Lambda", { envName: props.envName, projectName: props.projectName, } as LambdaConstructProps); new EventBridgeConstruct(this, "EventBridge", { envName: props.envName, projectName: props.projectName, dataSourceBucketName: s3Construct.dataSourceBucket.bucketName, lambdaFunctionArn: lambdaConstruct.lambdaArn, } as EventBridgeConstructProps); } }
上記ファイルではStackを定義しその中でConstructとしてリソースを定義しています。S3 Bucket、Lambda、 EventBridgeで依存関係があるため、上記の順となっています。bin/app.ts
から取得したenvName, projectNameは全てのConstructで活用するため、引数として指定しています。
parameter.ts
import { Environment } from "aws-cdk-lib"; // Parameters for Application export interface AppParameter { env: Environment; envName: string; projectName: string; } // Example export const devParameter: AppParameter = { envName: "dev", projectName: "cm-kasama", env: {}, }; export const prodParameter: AppParameter = { envName: "prod", projectName: "cm-kasama", env: {}, // env: { account: "xxxxxx", region: "ap-northeast-1" }, };
環境変数を定義するためのファイルとなります。accountには実際のAWS_ACCOUNT_IDを記載します。そのほかには、evnNameやprojectNameなどは一括で修正できるように変数として定義しています。私の場合はprofileで指定したIAM Roleに紐づくAWS Account、Regionへデプロイするためenvを空で設定しています。
resources/handler.py
def main(event, context): print("event:", event)
EventBridgeのトリガーとなったS3Pathを表示するためにeventを標準出力しています。
デプロイ
package.jsonがあるディレクトリで依存関係をインストールします。
npm install
npx cdk deploy --all --require-approval never -c environment=dev --profile <YOUR_AWS_PROFILE>
デプロイ後のEventBridgeのイベントパターン
実行結果
prefixとsuffix活用パターン
それではS3に対し以下のアクションを行い、検証してきたいと思います。
- inputフォルダをAWS Management Consoleから作成。
- input/test-20240629.csvファイル(値あり)格納
- input/testフォルダ作成。
- input/test/test.csvファイル格納。
- input/test-20240628.csvファイル(0byte)格納
input/test-20240629.csvファイル(値あり)格納時のみLambdaが起動する想定です。実際のCloudWatch logを確認しましたが、想定通り1回の起動のみで、test-20240629.csvファイルであることを確認しました。
wildcard活用パターン
コメントアウトしていたwildcardパターンで試してみたいと思います。
- inputフォルダをAWS Management Consoleから作成。
- input/test-20240629.csvファイル(値あり)格納
- input/testフォルダ作成。
- input/test/test.csvファイル格納。
- input/test-20240628.csvファイル(0byte)格納
input/test-20240629.csvファイル(値あり)格納時のみLambdaが起動する想定です。実際のCloudWatch logを確認しましたが、想定通り1回の起動のみで、test-20240629.csvファイルであることを確認しました。
最後に
EventBridgeで特定の条件で絞り込むことはよくあるので書き留めておきたい目的がありました。どなたかの参考になりましたら幸いです。